Databricks
Supported Databricks Runtime versions: 12.2 - 16.4 (scala 2.12)
❗ Note: Databricks Serverless is not supported for this instrumentation. You may optionally use the DBT agent instead.
To enable integration with definity on Databricks, follow these steps:
- Attach the Spark Agent JAR to your compute cluster.
- Configure jobs or tasks with definity parameters.
Cluster Configuration
1. Create an Init Script
Create a script to download and add the definity Spark agent to the cluster’s CLASSPATH
and set the default definity parameters. Save this script in cloud storage (e.g., S3).
#!/bin/bash
JAR_DIR="/databricks/jars"
mkdir -p $JAR_DIR
DEFINITY_JAR_URL="https://user:[email protected]/java/definity-spark-agent-[spark.version]-[agent.version].jar"
curl -o $JAR_DIR/definity-spark-agent.jar $DEFINITY_JAR_URL
export CLASSPATH=$CLASSPATH:$JAR_DIR/definity-spark-agent.jar
cat > /databricks/driver/conf/00-definity.conf << EOF
spark.plugins=ai.definity.spark.plugin.DefinitySparkPlugin
spark.definity.server="https://app.definity.run"
spark.definity.api.token=YOUR_TOKEN
#spark.definity.env.name=YOUR_DEFAULT_ENV
EOF
2. Attach the Init Script to Your Compute Cluster
In the Databricks UI:
- Go to Cluster configuration → Advanced options → Init Scripts.
- Add your script with:
- Source:
s3
- File path:
s3://your-s3-bucket/init-scripts-dir/definity_init.sh
- Source:
3. Configure Spark Cluster Name (Optional)
Default cluster name is taken from databricks cluster name.
Navigate to Cluster configuration → Advanced options → Spark and add:
spark.definity.compute.name my_cluster_name
Note: These settings affect the default Spark session created by the cluster. Definity will monitor this session automatically.
Job Configuration
By default, the databricks job name is used as the pipeline name, and the task key as the task name. If needed, you can override these settings in the job configuration:
Example: Airflow Notebook Job
run_notebook = DatabricksSubmitRunOperator(
task_id="run_notebook",
json={
"notebook_task": {
"notebook_path": "/Users/[email protected]/my_notebook",
"base_parameters": {
"spark.definity.pipeline.name": "{{ dag_run.dag_id }}",
"spark.definity.pipeline.pit": "{{ ts }}",
"spark.definity.task.name": "{{ ti.task_id }}"
},
},
"name": "notebook-job",
}
)
Example: Airflow Python Job
run_python = DatabricksSubmitRunOperator(
task_id="run_python_script",
json={
"spark_python_task": {
"python_file": "dbfs:/path/to/job.py",
"parameters": [
"spark.definity.pipeline.name={{ dag_run.dag_id }}",
"spark.definity.pipeline.pit={{ ts }}",
"spark.definity.task.name={{ ti.task_id }}"
]
},
"name": "python-job",
}
)
Example: Manual Task Scope Configuration
You can manually set task scope in your code.
When doing so, set the following Spark config at the cluster level to disable automatic session detection:
spark.definity.databricks.automaticSessions.enabled=false
Basic Example
# Set this property to define a new task scope
spark.conf.set("spark.definity.session", f"pipeline.name={my_pipeline},pipeline.pit={pit_date},task.name={my_task}")
Advanced Example
For multiple logical tasks in a single session, unset the property when the task ends:
try {
// your job logic here
...
} finally {
// Unset the session to signal task completion (recommended in a `finally` block to catch failures)
spark.conf.unset("spark.definity.session")
}
Note: This is not required for Python script jobs and notebook jobs.
Example: Jobs API
definity parameters can be passed via the base_parameters
or parameters
fields depending on the task type.
{
"tasks": [
{
"task_key": "task1",
"notebook_task": {
"notebook_path": "/Workspace/Users/user@org/task_notebook_1",
"source": "WORKSPACE",
"base_parameters": {
"spark.definity.pipeline.name": "my_pipeline",
"spark.definity.pipeline.pit": "2025-01-01 01:00:00",
"spark.definity.task.name": "task1"
}
},
"existing_cluster_id": "${DATABRICKS_CLUSTER}"
},
{
"task_key": "task2",
"notebook_task": {
"notebook_path": "/Workspace/Users/user@org/task_notebook_2",
"source": "WORKSPACE",
"base_parameters": {
"spark.definity.pipeline.name": "my_pipeline",
"spark.definity.pipeline.pit": "2025-01-01 01:00:00",
"spark.definity.task.name": "task2"
}
},
"existing_cluster_id": "${DATABRICKS_CLUSTER}"
},
{
"task_key": "python_task1",
"spark_python_task": {
"python_file": "s3://my-bucket/python_task.py",
"parameters": [
"yourArg1",
"yourArg2",
"spark.definity.task.name=python_task_1",
"spark.definity.pipeline.name=my_pipeline",
"spark.definity.pipeline.pit=2025-01-01 01:00:00"
]
},
"existing_cluster_id": "${DATABRICKS_CLUSTER}"
}
],
"format": "MULTI_TASK",
"queue": {
"enabled": true
}
}